// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#pragma once
#ifndef OS_WIN
#include <sys/mman.h>
#endif
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
#ifdef OS_FREEBSD
#include <malloc_np.h>
#else
#include <malloc.h>
#endif
#endif

#include <cassert>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <list>
#include <unordered_set>
#include <vector>

#include "arena_handle_event.h"
#include "pure_mem/rowex_art.h"
#include "pure_mem/rangearena/thread_safe_queue.h"
#include "pure_mem/rangearena/thread_safe_sorted_list.h"
#include "util/allocator.h"
#include "util/mutexlock.h"

// liliupeng guowenhao  RangeArenaclass maintains the relevant parameters of the
// memory block and the data in the memory block
namespace rocksdb {
typedef void* KeyHandle;

class RangeArena {
 public:
  RangeArena(size_t block_size, size_t list_size, const Slice& startkey,
             const Slice& endkey, AllocTracker* tracker);

  ~RangeArena();

  void freeBlock(){
    assert(current_block_state_.load() == MEMORY_BLOCK_IS_DELETING);
    delete[] sorted_resevered_kv_;
    sorted_resevered_kv_ = nullptr;
  }

  // memory allocate
  bool Allocate(size_t bytes, void* node, char** buf);

  size_t CurrentMemBlockMaxSize() const { return current_mem_block_max_size_; }
//  size_t CurrentBlockKvSize() const { return current_block_kv_size_; }
//  size_t CurrentBlockDeleteSize() const { return current_block_deleted_size_; }
//  bool IsNormalState() {
//    return current_block_state_.load() == MEMORY_BLOCK_OK;
//  }

  bool IsNormal() const{
      return this->current_block_state_.load() == MEMORY_BLOCK_OK
             || this->current_block_state_.load() == MEMORY_BLOCK_IS_INITING;
  }

  char* allocAtTmpBlock(size_t bytes, void* node){
      char* addr = new char[bytes];
      assert (addr < sorted_resevered_kv_ || addr >= sorted_resevered_kv_ + kSortedReservedKVSize_);
      kv_tmp_list.insertHead(node);
      current_block_kv_size_.fetch_add(bytes);
      allocate_times.fetch_add(1);
      return addr;
  }

  size_t IncreaseCurrentDeleteSize(size_t bytes) {
    this->current_block_deleted_size_.fetch_add(bytes);
    return this->current_block_deleted_size_.load();
  }

  void ChangeStatus(MemBlockState state) {
    MemBlockState s = current_block_state_.load();
    this->current_block_state_.compare_exchange_strong(s, state);
  }

  bool IsNotAllowedInsert() {
    return current_block_state_.load() == MEMORY_BLOCK_IS_NO_INSERT;
  }

  bool ChangeStatusOK2Handling() {
    MemBlockState s = MEMORY_BLOCK_OK;
    return this->current_block_state_.compare_exchange_strong(s, MEMORY_BLOCK_IS_HANDLING);
  }

  const Slice& CurrentBlockKeyEnd() const { return current_block_key_end_; }

  const Slice& CurrentBlockKeyStart() const { return current_block_key_start_; }

  AtomicLinkedList<void*>& Block2Node() { return kv_tmp_list; }

  AtomicLinkedList<void*>& NodePush() { return node_push_list; }

  void AddData2OKList(uint32_t length, const char* handle);

  void Dump() const {
    std::cout << "point:" << this <<",status: "<< current_block_state_.load()
              << ", start_Key:" << current_block_key_start_.ToString(true)
              << ", end_Key:" << current_block_key_end_.ToString(true)
              <<", block head:" << (void*)sorted_resevered_kv_
              << ", max size:" << current_mem_block_max_size_
              << ",cur_size: " << current_block_kv_size_
              <<", offset ok:" << data_ok_offset_.load()
              <<", kv_tmp_list:"<< kv_tmp_list.size()
              <<", allocat tmp:" << allocate_times.load()
              <<", tmp ok times:" << allocate_ok_times.load() << std::endl;
  }

  struct AddKVEvent {
    AddKVEvent(size_t length, const char* khandle)
        : encoded_len(length), handle(khandle) {}
    AddKVEvent(){
        encoded_len = 0;
        handle = nullptr;
    }
    size_t encoded_len;
    const char* handle;
    int compare(AddKVEvent& other){
        return (uint64_t)(this->handle) - (uint64_t)(other.handle);
    }
  };

  Status GetNeedFlushData(const char* &entry, size_t &kv_flush_last_size);
  bool IsNeedDelete() const;
  bool IsHandling() const;
  bool IsNeedFlush(size_t kv_flush_last_size);

  size_t Allocate_times() const { return allocate_times.load(); }
  size_t Resv_ok_times() const { return allocate_ok_times.load(); }

 private:
  const size_t current_mem_block_max_size_;
  const size_t kSortedReservedKVSize_;

  // The amount of kv data stored in the current memory block Used size
  std::atomic<size_t> current_block_kv_size_{0};
  // The amount of data deleted from the current memory block
  std::atomic<size_t> current_block_deleted_size_{0};
  char* sorted_resevered_kv_;  // Contiguous memory block space for storing
                               // sorted kv and reservered kv
  std::atomic<size_t> sorted_re_kv_offset_{0};
  std::atomic<size_t> data_ok_offset_{0};

  std::atomic<size_t> allocate_times{0};
  std::atomic<size_t> allocate_ok_times{0};

  std::atomic<MemBlockState> current_block_state_{MEMORY_BLOCK_IS_INITING};

  Slice current_block_key_start_;
  Slice current_block_key_end_;

  AtomicLinkedList<void*> kv_tmp_list;
  AtomicLinkedList<AddKVEvent> add_kv_OK_list_;
  AtomicLinkedList<void*> node_push_list;
};

}  // namespace rocksdb
